#!/usr/bin/env python3
# Copyright 2019-2022 Alibaba Group Holding Limited.
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause

import subprocess
import sys
import sh
import os
from glob import glob
import colorlog
import logging

handler = logging.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
    '%(cyan)s%(asctime)s%(reset)s %(log_color)s%(levelname)s %(white)s%(message)s%(reset)s',
    datefmt='%Y-%m-%d %H:%M:%S'))
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(handler)

class TestSchedSyscall:
    def setup_class(self):
        print("Sched syscall test")
        cmd = "while :; do :; done"
        self.child = subprocess.Popen(cmd, shell=True)
        self.rt_runtime_us = int(sh.sysctl('kernel.sched_rt_runtime_us').split()[-1])

    def load_scheduler(self):
        scheduler_rpm = glob(os.path.join('/tmp/work', 'scheduler*.rpm'))
        if len(scheduler_rpm) != 1:
            print("Please check your scheduler rpm");
            self.teardown_class()
            sys.exit(1)
        scheduler_rpm = scheduler_rpm[0]
        sh.rpm('-ivh', scheduler_rpm)

    def test_cpuset(self):
        fa_mems = sh.cat("/sys/fs/cgroup/cpuset/cpuset.mems").split()[0]
        fa_cpus = sh.cat("/sys/fs/cgroup/cpuset/cpuset.cpus").split()[0]
        sh.mkdir("/sys/fs/cgroup/cpuset/test")
        self.load_scheduler()
        sh.echo(fa_mems, _out="/sys/fs/cgroup/cpuset/test/cpuset.mems")
        sh.echo(fa_cpus, _out="/sys/fs/cgroup/cpuset/test/cpuset.cpus")
        ch_mems = sh.cat("/sys/fs/cgroup/cpuset/test/cpuset.mems").split()[0]
        ch_cpus = sh.cat("/sys/fs/cgroup/cpuset/test/cpuset.cpus").split()[0]
        if fa_mems != ch_mems or fa_cpus != ch_cpus:
            self.error_handler()
        self.remove_file()
        
    def test_policy_and_prio(self):
        sh.sysctl('-w', 'kernel.sched_rt_runtime_us=-1')
        sh.chrt('-p', '-f', 10, self.child.pid)
        res = sh.chrt('-p', self.child.pid).split('\n')
        if res[0].split()[-1] != 'SCHED_FIFO' or res[1].split()[-1] != '10':
            logging.info("class=%s prio=%s", res[0], res[1])
            self.error_handler()

    def test_all(self):
        self.test_cpuset()
        self.test_policy_and_prio()

    def error_handler(self):
        self.child.kill()
        self.child.wait()
        self.remove_file()
        self.unload_scheduler()
        sh.sysctl('-w', 'kernel.sched_rt_runtime_us={}'.format(self.rt_runtime_us))
        sys.exit(1)

    def remove_file(self):
        if os.path.exists("/sys/fs/cgroup/cpuset/test"):
            sh.rmdir("/sys/fs/cgroup/cpuset/test")

    def unload_scheduler(self):
        tmp = subprocess.Popen("lsmod | grep scheduler", shell=True, stdout=subprocess.PIPE)
        if tmp.stdout.read() != b'':
            sh.rpm('-e', 'scheduler-xxx')

    def teardown_class(self):
        self.child.kill()
        self.child.wait()
        self.unload_scheduler()
        sh.sysctl('-w', 'kernel.sched_rt_runtime_us={}'.format(self.rt_runtime_us))

if __name__ == '__main__':
    test_unit = TestSchedSyscall()
    test_unit.setup_class()
    test_unit.test_all()
    test_unit.teardown_class()

